fix: add a timeout to RedisSteamBroker xautoclaim lock to prevent infinite locking#107
Conversation
f03a4a3 to
3b690f2
Compare
|
@s3rius is something missing to merge this one ? We have prod issues are happening again and again because of that. |
e708799 to
4cd0d63
Compare
|
anyone ? @danfimov @spikeninja ? |
|
Add a test pls to test_broker.py @pytest.mark.anyio
async def test_unacknowledged_lock_timeout_in_stream_broker(
redis_url: str,
valid_broker_message: BrokerMessage,
) -> None:
unacknowledged_lock_timeout = 1
queue_name = uuid.uuid4().hex
consumer_group_name = uuid.uuid4().hex
broker = RedisStreamBroker(
url=redis_url,
approximate=False,
queue_name=queue_name,
consumer_group_name=consumer_group_name,
unacknowledged_lock_timeout=unacknowledged_lock_timeout,
)
await broker.startup()
await broker.kick(valid_broker_message)
message = await get_message(broker)
assert isinstance(message, AckableMessage)
assert message.data == valid_broker_message.message
async with Redis(connection_pool=broker.connection_pool) as redis:
lock_key = f"autoclaim:{consumer_group_name}:{queue_name}"
await redis.exists(lock_key)
await asyncio.sleep(unacknowledged_lock_timeout + 0.5)
lock_exists_after_timeout = await redis.exists(lock_key)
assert lock_exists_after_timeout == 0, "Lock should be released after timeout"
await broker.shutdown() |
4cd0d63 to
500ed50
Compare
Unfortunately your test doesn't cover the issue. The bug reproduction is quite complex : concurrent broker, crash, etc |
it should not cover your bug, it should cover at least the new field existence and check that the lock is properly acquired and released |
feel free to add your tests as i've mentioned here |
500ed50 to
04836a6
Compare
04836a6 to
e997423
Compare
|
Nullable typing + basic test ✅ |
|
Thx a lot @spikeninja 🙏 Do you think you can release soon ? |
https://github.com/taskiq-python/taskiq-redis/releases/tag/1.2.2 |
If for any reasons the worker crashes or get killed when holding the lock, the lock will never be released. It happened to us when a message make the container running out-of-memory, killing the worker right away and never releasing the lock.
This PR adds a timeout to at least release the lock at some point.